package com.example.demo.service.impl;

import com.example.demo.common.constant.CommonConstants;
import com.example.demo.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;

/**
 * kafka消费者Service
 * @date： 2021/5/19
 * @author: wbx
 */
@Component
@Slf4j
public class KafakaListenerService {
    @KafkaListener(topics = {CommonConstants.KAFKA_TOPIC})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> optional = Optional.ofNullable( record.value() );
        if (optional.isPresent()) {
            Object msg = optional.get();
            log.info( "record:{}", record );
            log.info("消费消息：" + msg + "==时间：" + DateUtil.getCurrentDateStr(DateUtil.C_TIME_PATTON_DEFAULT));
        }
    }
}
